Exposure database#
This notebook provides a systematic approach for analyzing and visualizing infrastructure networks across different European countries. It iterates through a list of countries and network types, retrieves the relevant geographic data, processes it to handle missing values, and generates visualizations to provide insights into the infrastructure distribution and characteristics within each country. Raw geospatial data is sourced from multiple databases, including OSM. Integration with OSM data is achieved using the OSMnx Python library, which simplifies downloading, modeling, analyzing, and visualizing OSM data. The integrated data is associated with EU codes representing all European countries (e.g., FR for France, NL for Netherlands).
# Define the folder where the database will be stored
# TODO: We could set a .miraca folder as a default for all packages
Miraca_Exposure_Database_path = Path("~/miraca_exposure_database").expanduser()
# Define the folder where the Geofabrik data is available
data_path = r"C:\Users\Paraskevi Tsoumani\OneDrive - Vrije Universiteit Amsterdam\Documenten - MIRACA\General\Deliverables and Milestones\Milestone_Intermediate version of harmonised exposure database\Exposure_database_new_version_2026\Data"
TENT_roads_path = data_path / "europe_road_edges_TENT.parquet"
TENT_rail_path = data_path / "europe_railways_edges_TENT.parquet"
LAU_path = data_path / "LAU_RG_01M_2024_3035.geojson"
NUTS_path = data_path / "NUTS_RG_01M_2024_3035.geojson"
DICT_CIS_OSM = {
"Roadway": {
"osm_keys": ["highway", "name", "maxspeed", "lanes", "surface","bridge","ref"],
"osm_query": {
"highway": [
"motorway",
"motorway_link",
"trunk",
"trunk_link",
"primary",
"primary_link",
"secondary",
"secondary_link",
"tertiary",
"tertiary_link",
"residential",
"road",
"unclassified",
"track",
]
},
},
"Railway": {
"osm_keys": ["railway", "name", "gauge", "electrified", "voltage"],
"osm_query": {"railway": ["rail", "narrow_gauge"]},
},
"Airports": {
"osm_keys": ["aeroway", "name"],
"osm_query": {"aeroway": ["aerodrome", "apron", "terminal", "runway"]},
},
"Ports": {
"osm_keys": ["waterway", "landuse", "man_made", "name"],
"osm_query": {
"waterway": ["harbour"],
"landuse": ["harbour"],
"man_made": ["pier"],
},
},
"Telecom": {
"osm_keys": ["man_made", "tower:type", "name"],
"osm_query": {
"man_made": ["mast", "communications_tower"],
"tower:type": ["communication"],
},
},
"Education": {
"osm_keys": ["amenity", "building", "name"],
"osm_query": {
"building": ["school", "kindergarten", "college", "university"],
"amenity": ["school", "kindergarten", "college", "university"],
},
},
"Healthcare": {
"osm_keys": ["amenity", "building", "healthcare", "name"],
"osm_query": {
"amenity": ["hospital", "clinic"],
"building": ["hospital", "clinic"],
},
},
"Power": {
"osm_keys": ["power", "voltage", "utility", "name", "source"],
"osm_query": {
"power": [
"line",
"cable",
"minor_line",
"plant",
"generator",
"substation",
"transformer",
"pole",
"portal",
"tower",
"terminal",
"switch",
"catenary_mast",
]
},
},
"Gas": {
"osm_keys": ["man_made", "pipeline", "utility", "name", "substance", "content"],
"osm_query": {
"man_made": ["gasometer", "pipeline", "storage_tank"],
"substance": ["gas", "LNG"],
"content": ["gas", "natural_gas", "LNG"],
"utility": ["gas"],
},
},
"Oil": {
"osm_keys": ["man_made", "pipeline", "amenity", "name", "substance", "content", "industrial"],
"osm_query": {
"man_made": ["petroleum_well", "oil_refinery", "pipeline", "storage_tank"],
"industrial": ["refinery", "oil"],
"substance": ["oil", "crude_oil", "petroleum", "diesel", "fuel_oil"],
"content": ["oil", "crude_oil", "petroleum", "diesel", "fuel_oil"],
"amenity": ["fuel"],
},
},
"Buildings": {
"osm_keys": ["building", "amenity", "name"],
"osm_query": {
"building": [
"yes",
"house",
"residential",
"detached",
"hut",
"industrial",
"shed",
"apartments",
]
},
},
}
OBJECTS_TO_KEEP = {
"Roadway": ["motorway", "motorway_link", "trunk", "trunk_link", "primary", "primary_link", "secondary", "secondary_link", "tertiary", "tertiary_link", "residential", "road", "unclassified", "track"],
"Railway": ["rail", "narrow_gauge"],
"Airports": ["aerodrome", "apron", "terminal", "runway"],
"Ports": ["harbour", "pier", "ferry_terminal"],
"Telecom": ["mast", "communications_tower", "communication"],
"Education": ["school", "kindergarten", "college", "university"],
"Healthcare": ["hospital", "clinic"],
"Power": ["line", "cable", "minor_line", "plant", "generator", "substation", "transformer", "pole", "portal", "tower", "terminal", "switch", "catenary_mast"],
"Gas": ["gasometer", "pipeline", "gas", "natural_gas", "LNG", 'storage_tank'],
"Oil": ["petroleum_well", "oil_refinery", "pipeline", "refinery", "oil", "crude_oil", "petroleum", "diesel", "fuel_oil", "fuel",'storage_tank'],
"Buildings": ["yes", "house", "residential", "detached", "hut", "industrial", "shed", "apartments"],
}
def _remove_contained_assets(features):
"""
Remove ONLY points contained within polygons.
Keep inner polygons (do not drop polygons contained in other polygons).
"""
features = _remove_contained_points(features)
return features
def _remove_contained_points(gdf_p_mp):
"""
Remove point features contained within any polygon in the dataset.
Args:
gdf_p_mp (gpd.GeoDataFrame): GeoDataFrame with point and polygon geometries.
Returns:
gpd.GeoDataFrame: GeoDataFrame without contained points.
"""
gdf_p_mp = gdf_p_mp.reset_index(drop=True)
ind_dupl = np.unique(
gpd.sjoin(
gdf_p_mp[gdf_p_mp.geometry.type == "Point"],
gdf_p_mp[
(gdf_p_mp.geometry.type == "MultiPolygon")
| (gdf_p_mp.geometry.type == "Polygon")
],
predicate="within",
).index
)
return gdf_p_mp.drop(index=ind_dupl).reset_index(drop=True)
def _remove_contained_polys(gdf):
"""
From a GeoDataFrame containing (multi-)polygons (and potentially other
geometries), remove those polygon entries that are already fully
contained in another polygon entries. Removes smaller polygons within
polygons and full duplicates, but leaves contained points untouched
(see remove_contained_points() for this).
Resets the index of the dataframe.
Args:
gdf (gpd.GeoDataFrame): GeoDataFrame with polygon geometries.
Returns:
gpd.GeoDataFrame: GeoDataFrame with outermost geometries.
"""
gdf = gdf.reset_index(drop=True)
contained = gpd.sjoin(
gdf[(gdf.geometry.type == "MultiPolygon") | (gdf.geometry.type == "Polygon")],
gdf[(gdf.geometry.type == "MultiPolygon") | (gdf.geometry.type == "Polygon")],
predicate="contains",
)
subset = contained[contained.index != contained.index_right]
to_drop = set(subset.index_right) - set(subset.index)
return gdf.drop(index=to_drop).reset_index(drop=True)
def extract_first_geom(geom):
"""
Extract the first geometry from a GeometryCollection.
Args:
geom (shapely.Geometry): Shapely geometry object.
Returns:
shapely.Geometry: First geometry or unchanged object.
"""
if isinstance(geom, GeometryCollection) and len(geom.geoms) > 0:
return geom.geoms[0]
return geom
def _extract_value(text, key):
"""
Parse the value of a specific key from a semi-structured OSM tag string.
Args:
text (str): Raw OSM `other_tags` string.
key (str): Key to extract value for.
Returns:
str or None: Extracted value or None.
"""
pattern = rf'"{key}"=>"([^"]+)"'
try:
match = re.search(pattern, text)
if match:
return match.group(1)
return None
except:
return None
def extract(osm_path, geom_type, osm_keys, osm_query):
"""
Extract specific infrastructure features from a .pbf file using OSM keys/values.
Args:
osm_path (str or Path): Path to .osm.pbf file.
geom_type (str): One of 'points', 'lines', 'multipolygons'.
osm_keys (list): Keys to extract from OSM file.
osm_query (dict): Key-value mapping used to filter.
Returns:
gpd.GeoDataFrame: Extracted GeoDataFrame with `object_type` field.
"""
features = gpd.read_file(osm_path, layer=geom_type, engine="pyogrio")
if "osm_way_id" in features.columns:
features["osm_id"] = features["osm_id"].fillna(features["osm_way_id"])
for key in osm_keys:
if key not in features.columns:
features[key] = features["other_tags"].apply(
lambda x: _extract_value(x, key)
)
# build query
collect_indices = []
for query_key in osm_query.keys():
collect_indices.append(
features[features[query_key].isin(osm_query[query_key])].index.values
)
# get complete list
collect_indices = functools.reduce(operator.iconcat, collect_indices, [])
# remove duplicates from list
collect_indices = list(set(collect_indices))
features = features.iloc[collect_indices]
features = features[["osm_id", "geometry"] + osm_keys]
features.rename(columns={osm_keys[0]: "object_type"}, inplace=True)
return features
def read_osm_data(osm_path, asset_type):
"""
Load and extract OSM features for a given critical infrastructure type.
Args:
osm_path (str or Path): Path to .osm.pbf file.
asset_type (str): One of the keys in DICT_CIS_OSM.
Returns:
gpd.GeoDataFrame: Cleaned and validated exposure GeoDataFrame.
Raises:
ImportWarning: If asset_type is not supported.
"""
# features consisting in points and multipolygon results:
if asset_type in ["Healthcare", "Education", "Buildings", "Ports"]:
gdf = pd.concat(
[
extract(
osm_path,
"points",
DICT_CIS_OSM[asset_type]["osm_keys"],
DICT_CIS_OSM[asset_type]["osm_query"],
),
extract(
osm_path,
"multipolygons",
DICT_CIS_OSM[asset_type]["osm_keys"],
DICT_CIS_OSM[asset_type]["osm_query"],
),
]
)
if asset_type == "Ports":
waterway = gdf["waterway"] if "waterway" in gdf.columns else pd.Series(pd.NA, index=gdf.index)
landuse = gdf["landuse"] if "landuse" in gdf.columns else pd.Series(pd.NA, index=gdf.index)
man_made = gdf["man_made"] if "man_made" in gdf.columns else pd.Series(pd.NA, index=gdf.index)
gdf["object_type"] = waterway.fillna(landuse).fillna(man_made)
# features consisting in points, multipolygons and lines:
elif asset_type in ["Gas", "Oil", "Power"]:
gdf = pd.concat(
[
extract(
osm_path,
"points",
DICT_CIS_OSM[asset_type]["osm_keys"],
DICT_CIS_OSM[asset_type]["osm_query"],
),
extract(
osm_path,
"multipolygons",
DICT_CIS_OSM[asset_type]["osm_keys"],
DICT_CIS_OSM[asset_type]["osm_query"],
),
extract(
osm_path,
"lines",
DICT_CIS_OSM[asset_type]["osm_keys"],
DICT_CIS_OSM[asset_type]["osm_query"],
),
]
)
# features consisting in multipolygons and lines:
elif asset_type in ["Airports"]:
gdf = pd.concat(
[
extract(
osm_path,
"multipolygons",
DICT_CIS_OSM[asset_type]["osm_keys"],
DICT_CIS_OSM[asset_type]["osm_query"],
),
extract(
osm_path,
"lines",
DICT_CIS_OSM[asset_type]["osm_keys"],
DICT_CIS_OSM[asset_type]["osm_query"],
),
]
)
# features consisting in multiple datattypes, but only lines needed:
elif asset_type in ["Railway", "Roadway"]:
gdf = pd.concat(
[
extract(
osm_path,
"lines",
DICT_CIS_OSM[asset_type]["osm_keys"],
DICT_CIS_OSM[asset_type]["osm_query"],
)
]
)
# features consisting in all data types, but only points and multipolygon needed:
elif asset_type in [
"Telecom",
]:
gdf = pd.concat(
[
extract(
osm_path,
"points",
DICT_CIS_OSM[asset_type]["osm_keys"],
DICT_CIS_OSM[asset_type]["osm_query"],
),
extract(
osm_path,
"multipolygons",
DICT_CIS_OSM[asset_type]["osm_keys"],
DICT_CIS_OSM[asset_type]["osm_query"],
),
]
)
else:
raise ImportWarning("feature not in DICT_CIS_OSM. Returning empty gdf")
# make all geometries valid
gdf["geometry"] = shapely.make_valid(gdf["geometry"])
gdf = gdf[gdf.geometry.is_valid]
# only keep assets with unique geometries
features = _remove_contained_assets(gdf)
# remove potential geometrycollections to avoid errors later on
features["geometry"] = features["geometry"].apply(extract_first_geom)
# if asset_type == "Telecom":
# communication = features["communication"] if "communication" in features.columns else pd.Series(pd.NA, index=features.index)
# tower_type = features["tower:type"] if "tower:type" in features.columns else pd.Series(pd.NA, index=features.index)
# telecom_mask = (
# communication.isin(["communication", "mobile_phone"]) |
# tower_type.isin(["communication", "telecommunication"])
# )
# features = features[telecom_mask]
# remove features that are not in the asset_type list
unique_objects_in_asset_type = OBJECTS_TO_KEEP[asset_type]
return features[features["object_type"].isin(unique_objects_in_asset_type)]
FORCE_GEOM_RULES = {
"Power": {
"catenary_mast": "Point",
"portal": "Point",
"transformer": "Point",
"switch": "Point",
"terminal": "Point",
"tower": "Point",
"pole": "Point",
},
"Telecom": {
"mast": "Point",
"communications_tower": "Point",
"communication": "Point",
}
}
def _apply_force_geom_rules(features, asset_type):
rules = FORCE_GEOM_RULES.get(asset_type, {})
if not rules:
return features
features = features.copy()
for obj_type, target in rules.items():
if target != "Point":
continue
gtypes = features.geometry.geom_type
mask = (
(features["object_type"] == obj_type) &
(gtypes.isin(["LineString", "MultiLineString", "Polygon", "MultiPolygon"]))
)
if mask.any():
def to_point(geom):
# for lines: midpoint; for polygons: representative_point
try:
if geom.geom_type in ["LineString", "MultiLineString"]:
return geom.interpolate(0.5, normalized=True)
else:
return geom.representative_point()
except Exception:
return geom.representative_point()
features.loc[mask, "geometry"] = features.loc[mask, "geometry"].apply(to_point)
return features
def convert_mixed_geometries_to_polygons(features, asset_type):
"""
Convert point and linestring geometries to polygons for asset types with mixed geometry types.
Only converts geometries for object types that have at least some polygon representations.
Respects specific object types that should maintain their original geometry.
Adds ONE output column:
- geom_changed (True/False): True iff geometry type changed compared to the extracted geometry type.
"""
# Only apply for certain asset types
if asset_type not in ['Education', 'Healthcare', 'Telecom', 'Power', 'Gas', 'Oil', "Airports", "Ports"]:
return features
# Work on a copy (avoid chained assignment)
features = features.copy()
# --- ROBUST: store original geometry type BEFORE any conversion ---
features["_geom_type_orig"] = features.geometry.geom_type
features = _apply_force_geom_rules(features, asset_type)
# Define object types that should NOT be converted for each asset type
preserve_geometry = {
'Power': {
'line': 'LineString', # Should always remain as lines
'tower': 'Point', # Should always remain as points
'pole': 'Point', # Should always remain as points
'catenary_mast': 'Point',
'transformer': 'Point',
'switch' : 'Point',
'portal': 'Point', ## Should always remain as points
'cable': 'LineString', # Should always remain as lines
'minor_line': 'LineString' # Should always remain as lines
},
'Gas': {
'pipeline': 'LineString' # Should always remain as lines
},
'Airports': {
'runway': 'LineString'
},
'Oil': {
'pipeline': 'LineString' # Should always remain as lines
},
'Telecom': {
'mast': 'Point', # Should always remain as points
'communications_tower': 'Point' # Should always remain as points
},
}
# Get the preserve list for this asset type
preserve_list = preserve_geometry.get(asset_type, {})
# Add geometry type information (working type, used for logic below)
features['geom_type'] = features.geometry.geom_type
# Create a mask for features that should preserve their geometry
preserve_mask = pd.Series(False, index=features.index)
for obj_type, geom_type in preserve_list.items():
type_mask = (features['object_type'] == obj_type) & (features['geom_type'] == geom_type)
preserve_mask = preserve_mask | type_mask
# Get polygon features to calculate median areas (only for non-preserved features)
polygon_features = features.loc[
(~preserve_mask) & features.geom_type.isin(['Polygon', 'MultiPolygon'])
].to_crs(3035)
# If no polygon features exist, return original features (but still add geom_changed=False)
if len(polygon_features) == 0:
features["geom_changed"] = False
features = features.drop(['geom_type', '_geom_type_orig'], axis=1)
return features
polygon_features['square_m2'] = polygon_features.area
# Calculate median area by object type
square_m2_object_type = polygon_features[['object_type', 'square_m2']].groupby('object_type').median()
# Default area if median cannot be calculated (1000 sq meters ~ small building)
default_area = 1000
# Find object types that have mixed geometries (linestrings + polygons / points + polygons)
non_preserved_features = features[~preserve_mask]
mixed_geom_types = non_preserved_features.groupby(['object_type', 'geom_type']).size().unstack().fillna(0)
# ----------------------------
# Convert LineStrings -> Polygons (ONLY if that object_type also has polygons)
# ----------------------------
linestrings_to_polygonize = []
if 'LineString' in mixed_geom_types.columns and any(col in mixed_geom_types.columns for col in ['Polygon', 'MultiPolygon']):
for obj_type in mixed_geom_types.index:
# Skip if this object type should be preserved as LineString
if obj_type in preserve_list and preserve_list[obj_type] == 'LineString':
continue
line_count = mixed_geom_types.loc[obj_type, 'LineString'] if 'LineString' in mixed_geom_types.columns else 0
poly_count = sum(
mixed_geom_types.loc[obj_type, col]
for col in ['Polygon', 'MultiPolygon'] if col in mixed_geom_types.columns
)
if line_count > 0 and poly_count > 0:
linestrings_to_polygonize.append(obj_type)
if linestrings_to_polygonize:
print(f"Converting linestrings to polygons for {asset_type}: {linestrings_to_polygonize}")
all_linestrings_to_polygonize = features.loc[
(features.object_type.isin(linestrings_to_polygonize)) &
(features.geom_type == 'LineString') &
(~preserve_mask)
]
if len(all_linestrings_to_polygonize) > 0:
def polygonize_linestring(linestring):
try:
if getattr(linestring, "is_closed", False):
return shapely.geometry.Polygon(linestring)
else:
return linestring.buffer(0.0001)
except Exception:
return linestring.buffer(0.0001)
new_geometries = all_linestrings_to_polygonize.geometry.apply(polygonize_linestring).values
features.loc[
(features.object_type.isin(linestrings_to_polygonize)) &
(features.geom_type == 'LineString') &
(~preserve_mask),
'geometry'
] = new_geometries
# ----------------------------
# Convert Points -> Polygons (ONLY if that object_type also has polygons)
# ----------------------------
points_to_polygonize = []
if 'Point' in mixed_geom_types.columns and any(col in mixed_geom_types.columns for col in ['Polygon', 'MultiPolygon']):
for obj_type in mixed_geom_types.index:
# Skip if this object type should be preserved as Point
if obj_type in preserve_list and preserve_list[obj_type] == 'Point':
continue
point_count = mixed_geom_types.loc[obj_type, 'Point'] if 'Point' in mixed_geom_types.columns else 0
poly_count = sum(
mixed_geom_types.loc[obj_type, col]
for col in ['Polygon', 'MultiPolygon'] if col in mixed_geom_types.columns
)
if point_count > 0 and poly_count > 0:
points_to_polygonize.append(obj_type)
if points_to_polygonize:
all_assets_to_polygonize = features.loc[
(features.object_type.isin(points_to_polygonize)) &
(features.geom_type == 'Point') &
(~preserve_mask)
].to_crs(3035)
if len(all_assets_to_polygonize) > 0:
print(f"Converting {len(all_assets_to_polygonize)} points to polygons for {asset_type}: {points_to_polygonize}")
def polygonize_point_per_asset(asset):
# Get area from median per object_type, else default
if asset.object_type in square_m2_object_type.index:
area = square_m2_object_type.loc[asset.object_type].values[0]
else:
area = default_area
buffer_length = np.sqrt(area) / 2
return asset.geometry.buffer(buffer_length, cap_style='square')
new_geometries = all_assets_to_polygonize.apply(
lambda asset: polygonize_point_per_asset(asset), axis=1
).values
# NOTE: keep your original behavior (no CRS back-transform) as in your code
features.loc[
(features.object_type.isin(points_to_polygonize)) &
(features.geom_type == 'Point') &
(~preserve_mask),
'geometry'
] = new_geometries
# --- ROBUST geom_changed flag: compare original vs final geom type ---
features["geom_changed"] = features.geometry.geom_type.ne(features["_geom_type_orig"])
# Drop temporary columns
features = features.drop(['geom_type', '_geom_type_orig'], axis=1)
return features
This dictionary maps the names of European countries to their respective ISO 3166-1 alpha-2 codes. These codes are two-letter country codes defined in the ISO 3166-1 standard.The dictionary keys are country names (str) and values are their ISO 3166-1 alpha-2 codes (str).
EU_code_full = {
"AL": "Albania",
"AT": "Austria",
"BE": "Belgium",
"BG": "Bulgaria",
"CH": "Switzerland",
"CY": "Cyprus",
"CZ": "Czechia",
"DE": "Germany",
"DK": "Denmark",
"EE": "Estonia",
"EL": "Greece",
"ES": "Spain",
"FI": "Finland",
"FR": "France",
"HR": "Croatia",
"HU": "Hungary",
"IE": "Ireland",
"IS": "Iceland",
"IT": "Italy",
"LT": "Lithuania",
"LU": "Luxembourg",
"LV": "Latvia",
"MK": "North Macedonia",
"MT": "Malta",
"NL": "Netherlands",
"NO": "Norway",
"PL": "Poland",
"PT": "Portugal",
"RO": "Romania",
"SE": "Sweden",
"SI": "Slovenia",
"SK": "Slovakia"
}
# List of networks
network_list = [
"Education",
"Healthcare",
"Airports",
"Ports",
"Roadway",
"Railway",
"Telecom",
"Power",
"Gas",
"Oil",
]
Choose country and asset#
# choose country
country = 'ES'
# choose asset_type
asset_type = 'Ports'
pbf_file = "spain-260119.osm"
%%time
osm_path = data_path / f"{pbf_file}.pbf"
features = read_osm_data(osm_path,asset_type=asset_type)
C:\Users\Paraskevi Tsoumani\anaconda3\envs\miraca\Lib\site-packages\pyogrio\raw.py:200: RuntimeWarning: Non closed ring detected. To avoid accepting it, set the OGR_GEOMETRY_ACCEPT_UNCLOSED_RING configuration option to NO
return ogr_read(
CPU times: total: 5min 36s
Wall time: 5min 7s
tqdm.pandas()
# Ensure CRS then work in 3035
if features.crs is None:
features = features.set_crs(4326, allow_override=True)
features_3035 = features.to_crs(3035)
# Geometry harmonization BEFORE LAU clipping (recommended)
features_3035 = convert_mixed_geometries_to_polygons(features_3035, asset_type)
# Keep valid & non-empty before clipping
features_3035 = features_3035[features_3035.geometry.notna() & ~features_3035.geometry.is_empty]
features_3035 = features_3035[features_3035.is_valid].reset_index(drop=True)
# -------------------------------------------------
# Airports: keep ONLY runway lines
# -------------------------------------------------
if asset_type == "Airports":
is_runway = features_3035["object_type"] == "runway"
is_polygon = features_3035.geometry.geom_type.isin(
["Polygon", "MultiPolygon"]
)
features_3035 = features_3035[
~(is_runway & is_polygon)
].reset_index(drop=True)
# -------------------------------------------------
# Gas: keep ONLY pipeline lines (remove polygons)
# -------------------------------------------------
if asset_type == "Gas":
is_pipeline = features_3035["object_type"] == "pipeline"
is_polygon = features_3035.geometry.geom_type.isin(
["Polygon", "MultiPolygon"]
)
features_3035 = features_3035[
~(is_pipeline & is_polygon)
].reset_index(drop=True)
# -------------------------------------------------
# Power: keep ONLY line geometries for power=line
# -------------------------------------------------
if asset_type == "Power":
is_line = features_3035["object_type"] == "line"
is_polygon = features_3035.geometry.geom_type.isin(
["Polygon", "MultiPolygon"]
)
features_3035 = features_3035[
~(is_line & is_polygon)
].reset_index(drop=True)
# -------------------------------------------------
# Oil: keep ONLY pipeline lines (remove pipeline polygons)
# -------------------------------------------------
if asset_type == "Oil":
is_pipeline = features_3035["object_type"] == "pipeline"
is_polygon = features_3035.geometry.geom_type.isin(["Polygon", "MultiPolygon"])
features_3035 = features_3035[~(is_pipeline & is_polygon)].reset_index(drop=True)
# Load admin layers in 3035
LAU = gpd.read_file(LAU_path, engine="pyogrio")
NUTS = gpd.read_file(NUTS_path, engine="pyogrio")
LAU_country_3035 = LAU.loc[LAU.CNTR_CODE == country].to_crs(3035).reset_index(drop=True)
NUTS2_3035 = NUTS.loc[NUTS.LEVL_CODE == 2].to_crs(3035)[["NUTS_ID", "geometry"]].reset_index(drop=True)
# 1) Find asset-LAU pairs (no cutting yet)
pairs = gpd.sjoin(
features_3035,
LAU_country_3035[["GISCO_ID", "CNTR_CODE", "geometry"]],
predicate="intersects",
how="inner",
).drop(columns=["index_right"]).rename(columns={"GISCO_ID": "LAU"})
# 1) Find asset-LAU pairs (no cutting yet)
pairs = gpd.sjoin(
features_3035,
LAU_country_3035[["GISCO_ID", "CNTR_CODE", "geometry"]],
predicate="intersects",
how="inner",
).drop(columns=["index_right"]).rename(columns={"GISCO_ID": "LAU"})
print("features_3035:", len(features_3035))
print("LAU_country_3035:", len(LAU_country_3035))
print("pairs after sjoin:", len(pairs))
# 2) Attach LAU geometry for clipping
pairs = pairs.merge(
LAU_country_3035[["GISCO_ID", "geometry"]].rename(columns={"GISCO_ID": "LAU", "geometry": "lau_geom"}),
on="LAU",
how="left",
)
# 3) Clip (intersection) -> THIS is the cutting step
pairs["geometry"] = pairs.apply(lambda r: r.geometry.intersection(r.lau_geom), axis=1)
# 4) Clean clipped outputs
pairs = pairs[pairs.geometry.notna() & ~pairs.geometry.is_empty]
pairs = pairs[pairs.is_valid].drop(columns=["lau_geom"]).reset_index(drop=True)
# 5) Assign NUTS2 to clipped pieces (optional but consistent)
pairs_rp = pairs.copy()
pairs_rp["geometry"] = pairs_rp.geometry.representative_point()
nuts_join = gpd.sjoin(
pairs_rp,
NUTS2_3035,
predicate="within",
how="left",
).drop(columns=["index_right"]).rename(columns={"NUTS_ID": "NUTS2"})
pairs["NUTS2"] = nuts_join["NUTS2"].values
# Final output
country_assets = pairs.copy()
# Correct osm_id dtype (NO float)
country_assets["osm_id"] = pd.to_numeric(country_assets["osm_id"], errors="coerce").astype("Int64")
if asset_type == "Gas":
gas_tokens = {"gas", "natural_gas", "lng"}
def norm(s):
return s.astype("string").str.lower()
man_made = norm(country_assets.get("man_made", pd.Series(pd.NA, index=country_assets.index)))
obj_type = norm(country_assets.get("object_type", pd.Series(pd.NA, index=country_assets.index)))
utility = norm(country_assets.get("utility", pd.Series(pd.NA, index=country_assets.index)))
substance = norm(country_assets.get("substance", pd.Series(pd.NA, index=country_assets.index)))
content = norm(country_assets.get("content", pd.Series(pd.NA, index=country_assets.index)))
pipeline = norm(country_assets.get("pipeline", pd.Series(pd.NA, index=country_assets.index)))
# gas-related signal from attributes
gas_signal = (
utility.isin(gas_tokens) |
substance.isin(gas_tokens) |
content.isin(gas_tokens) |
pipeline.isin(gas_tokens)
)
# ALWAYS keep gasometers (even if tags are missing)
keep_gasometer = (man_made == "gasometer") | (obj_type == "gasometer")
# Keep pipelines + storage tanks only if gas signal exists
keep_pipe_or_tank = (
((man_made.isin(["pipeline", "storage_tank"])) | (obj_type.isin(["pipeline", "storage_tank"])))
& gas_signal
)
country_assets = country_assets[(keep_gasometer | keep_pipe_or_tank)].reset_index(drop=True)
# Road grouping only for Roadway
if asset_type == "Roadway":
ROAD_GROUPS = {
"Motorways and Trunks": {"motorway", "motorway_link", "trunk", "trunk_link"},
"Primary Roads": {"primary", "primary_link"},
"Secondary roads": {"secondary", "secondary_link"},
"Tertiary roads": {"tertiary", "tertiary_link"},
"Other roads": {"residential", "unclassified", "track", "road"},
}
# reverse dictionary
road_group_map = {v: k for k, vals in ROAD_GROUPS.items() for v in vals}
country_assets["road_class"] = country_assets["object_type"].map(road_group_map)
country_assets["road_class"] = country_assets["road_class"].fillna("Other roads")
# -----------------------------------------
# FINAL CLEANING for Telecom (AFTER clipping)
# -----------------------------------------
if asset_type == "Telecom":
def norm(s):
return s.astype("string").str.lower()
man_made = norm(country_assets.get("man_made", pd.Series(pd.NA, index=country_assets.index)))
tower_type = norm(country_assets.get("tower:type", pd.Series(pd.NA, index=country_assets.index)))
communication = norm(country_assets.get("communication", pd.Series(pd.NA, index=country_assets.index)))
telecom_mask = (
man_made.isin(["mast", "communications_tower"]) |
tower_type.isin(["communication", "telecommunication"]) |
communication.isin(["communication", "mobile_phone", "telecommunication"])
)
country_assets = country_assets[telecom_mask].reset_index(drop=True)
if asset_type == "Oil":
oil_tokens = {"oil", "crude_oil", "petroleum", "diesel", "fuel_oil", "fuel"}
def norm(s):
return s.astype("string").str.lower()
man_made = norm(country_assets.get("man_made", pd.Series(pd.NA, index=country_assets.index)))
obj_type = norm(country_assets.get("object_type", pd.Series(pd.NA, index=country_assets.index)))
substance = norm(country_assets.get("substance", pd.Series(pd.NA, index=country_assets.index)))
content = norm(country_assets.get("content", pd.Series(pd.NA, index=country_assets.index)))
pipeline = norm(country_assets.get("pipeline", pd.Series(pd.NA, index=country_assets.index)))
industrial = norm(country_assets.get("industrial", pd.Series(pd.NA, index=country_assets.index)))
amenity = norm(country_assets.get("amenity", pd.Series(pd.NA, index=country_assets.index)))
oil_signal = (
substance.isin(oil_tokens) |
content.isin(oil_tokens) |
pipeline.isin(oil_tokens) |
industrial.isin({"refinery", "oil"}) |
amenity.isin({"fuel"})
)
keep_core = (
(man_made.isin(["petroleum_well", "oil_refinery"])) |
(obj_type.isin(["petroleum_well", "oil_refinery"]))
)
keep_fuel = (amenity == "fuel")
keep_pipe_or_tank = (
((man_made.isin(["pipeline", "storage_tank"])) | (obj_type.isin(["pipeline", "storage_tank"])))
& oil_signal
)
country_assets = country_assets[(keep_core | keep_pipe_or_tank | keep_fuel)].reset_index(drop=True)
Converting 29 points to polygons for Ports: ['harbour', 'pier']
features_3035: 592
LAU_country_3035: 8132
pairs after sjoin: 415
# only for roads/railway: add corridor values
if asset_type.lower() in ["roadway", "railway"]:
tent_path = TENT_roads_path if asset_type.lower() == "roadway" else TENT_rail_path
add_path = pd.read_parquet(tent_path)[["osm_way_id", "CORRIDORS"]]
corridor_dict = dict(zip(add_path["osm_way_id"], add_path["CORRIDORS"]))
country_assets["CORRIDOR"] = country_assets["osm_id"].map(corridor_dict)
country_assets.head()
| osm_id | geometry | object_type | landuse | man_made | name | geom_changed | LAU | CNTR_CODE | NUTS2 | |
|---|---|---|---|---|---|---|---|---|---|---|
| 0 | 6760224505 | POLYGON ((3041987.865 1774299.493, 3041973.804... | pier | None | pier | None | True | ES_14067 | ES | ES61 |
| 1 | 11298782309 | POLYGON ((3107511.871 2384107.640, 3107497.811... | pier | None | pier | None | True | ES_33056 | ES | ES12 |
| 2 | 6751917251 | POLYGON ((3081402.344 1778084.530, 3081388.283... | pier | None | pier | None | True | ES_23005 | ES | ES61 |
| 3 | 3783769293 | POLYGON ((2981119.580 2195491.509, 2981105.520... | pier | None | pier | None | True | ES_49240 | ES | ES41 |
| 4 | 3814450662 | POLYGON ((3149945.228 2275522.209, 3149931.167... | pier | None | pier | Embarcadero "Marqués de la Ensenada" | True | ES_34083 | ES | ES41 |
# Base output directory
base_out = Path("Exposure_files")
# Create folder named after the asset type
asset_folder = base_out / asset_type
asset_folder.mkdir(parents=True, exist_ok=True)
# Save
country_assets.to_parquet(asset_folder / f"{asset_type}_{country}.parquet")
Statistics and visualization#
tmp = country_assets.copy()
tmp["geometry_type"] = tmp.geometry.geom_type.astype("string")
tmp["object_type"] = tmp["object_type"].astype("string")
# 1) Counts table: object_type x geometry_type
counts = (
tmp.groupby(["object_type", "geometry_type"])
.size()
.reset_index(name="count")
.sort_values(["object_type", "geometry_type"])
)
# Add percentages (overall share)
total_n = counts["count"].sum()
counts["pct_total"] = (counts["count"] / total_n * 100).round(2)
print(f"\n=== {asset_type} / {country} ===")
print(f"Total features: {len(tmp)}")
print("\nObject type × Geometry type counts:")
display(counts)
=== Ports / ES ===
Total features: 415
Object type × Geometry type counts:
| object_type | geometry_type | count | pct_total | |
|---|---|---|---|---|
| 0 | harbour | MultiPolygon | 8 | 1.93 |
| 1 | harbour | Polygon | 51 | 12.29 |
| 2 | pier | MultiPolygon | 2 | 0.48 |
| 3 | pier | Polygon | 354 | 85.30 |
# Choose the column to summarize
if asset_type == "Roadway":
# make sure road_class exists (created earlier in your pipeline)
if "road_class" not in tmp.columns:
raise ValueError("road_class not found. Run the road grouping block before plotting.")
plot_col = "road_class"
else:
plot_col = "object_type"
tmp[plot_col] = tmp[plot_col].astype("string")
obj_counts = tmp[plot_col].value_counts(dropna=False)
obj_counts.index = obj_counts.index.fillna("Unknown")
def autopct_threshold(pct):
return f"{pct:.1f}%" if pct >= 2 else ""
fig, ax = plt.subplots(figsize=(5.5,4))
wedges, texts, autotexts = ax.pie(
obj_counts.values,
labels=obj_counts.index,
autopct=autopct_threshold,
startangle=140,
wedgeprops={"edgecolor": "black", "linewidth": 0.3},
textprops={"fontsize": 9},
)
for autotext in autotexts:
autotext.set_fontsize(9)
autotext.set_color("black")
ax.legend(
wedges,
obj_counts.index,
loc="center left",
bbox_to_anchor=(1, 0.5),
markerscale=0.8,
handlelength=0.9,
handleheight=0.9,
fontsize=9,
)
ax.axis("equal")
# Save in country folder
stats_folder = Path("Exposure_files") / "_stats" / country
stats_folder.mkdir(parents=True, exist_ok=True)
# Output name: use road_class in filename for clarity
suffix = "road_class" if asset_type == "Roadway" else "object_type"
out_png = stats_folder / f"stats_{suffix}_{asset_type}_{country}.png"
plt.savefig(out_png, dpi=300, bbox_inches="tight", transparent=True)
plt.show()
print("Saved:", out_png)
Saved: Exposure_files\_stats\ES\stats_object_type_Ports_ES.png
# QUICK MAP (interactive)
viz = country_assets.copy()
if viz.crs is None:
viz = viz.set_crs(3035, allow_override=True)
viz_4326 = viz.to_crs(4326)
# avoid heavy rendering
MAX_SHOW = 8000
if len(viz_4326) > MAX_SHOW:
viz_4326 = viz_4326.sample(MAX_SHOW, random_state=42)
m = viz_4326.explore(
column="object_type",
tooltip=["object_type", "osm_id", "LAU", "NUTS2"],
legend=True,
)
m